-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kyve support #235
Add kyve support #235
Conversation
Coverage report
Test suite run success58 tests passing in 7 suites. Report generated by 🧪jest coverage report action from e138dc5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work when workers are enabled? It doesn't seem to make sense that each worker download the same bundles
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When testing this it never seems to use Kyve, only RPC
…er api. update based on review
…sts for new log injector
packages/node/src/utils/kyve/kyve.ts
Outdated
bundleId = | ||
Math.max(...nearestBundle.map((b) => parseDecimal(b.id))) + 1; | ||
|
||
this.cachedBundleDetails[bundleId] = this.getBundleById(bundleId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assignment is redundant
packages/node/src/utils/kyve/kyve.ts
Outdated
await Promise.race([ | ||
new Promise((resolve, reject) => { | ||
writeStream.on('open', resolve); | ||
writeStream.on('error', reject); | ||
}), | ||
delay(5).then(() => { | ||
throw new Error('Timeout: File stream did not open'); | ||
}), | ||
]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a timeout function, but to me there is a bigger issue that this promise doesn't resolve. It should always resolve
packages/node/src/utils/kyve/kyve.ts
Outdated
limit: '1', | ||
}, | ||
}) | ||
).pagination.total, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a little fragile, why not set reverse
on the pagination and get the most recent?
We can also cache the finalized bundle
packages/node/src/utils/kyve/kyve.ts
Outdated
bundleId = | ||
Math.max(...nearestBundle.map((b) => parseDecimal(b.id))) + 1; | ||
|
||
bundle = await this.getBundleById(bundleId); | ||
|
||
while (parseDecimal(bundle.to_key) < height) { | ||
bundleId++; | ||
bundle = await this.getBundleById(bundleId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be a separate function. We can make it easy to switch between iterating and binary search if the functions have the same interface
packages/node/src/utils/kyve/kyve.ts
Outdated
} | ||
} | ||
|
||
await fs.promises.chmod(bundleFilePath, 0o666); // Reset permissions if polling exceeds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? What if the bundle is just taking a really long time to download?
packages/node/src/utils/kyve/kyve.ts
Outdated
// XXXX:SCOTT This can get stuck and not resolve, it seems a file can get stuck with permissions not reset (indexer restart) | ||
// Its probably worth adding a timeout on this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can have a better comment explaining the timeout
packages/node/src/utils/kyve/kyve.ts
Outdated
.pipe(writeStream) | ||
.on('error', reject) | ||
.on('finish', async () => { | ||
await fs.promises.chmod(bundleFilePath, 0o444); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move this out of the promise?
await this.downloadAndProcessBundle(bundle); | ||
return await this.readFromFile(bundleFilePath); | ||
} catch (e: any) { | ||
if (['EEXIST', 'EACCES'].includes(e.code)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth tying this array with the other occurrence?
packages/node/src/utils/kyve/kyve.ts
Outdated
for (const file of files) { | ||
if (this.isBundleFile(file)) { | ||
const id = parseDecimal(file.match(BUNDLE_FILE_ID_REG(this.poolId))[1]); | ||
bundles.push(await this.getBundleById(id)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a map/promise.all instead
packages/node/src/utils/kyve/kyve.ts
Outdated
const currentBundle = await this.getBundleFromCache(height); | ||
if (!currentBundle) return []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be removed
packages/node/src/utils/kyve/kyve.ts
Outdated
const isOutsiderBuffer = | ||
height < parseDecimal(b.from_key) - bufferSize || | ||
height > parseDecimal(b.to_key) + bufferSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only bundles below the buffer matter, If we're doing a binary search or have any future bundles they will be removed from the cache
packages/node/src/utils/kyve/kyve.ts
Outdated
const blocks = await this.updateCurrentBundleAndDetails(height); | ||
const blockData = this.findBlockByHeight(height, blocks); | ||
assert(blockData, `Unable to retrieve block: ${height} from file cache.`); | ||
// XXXX:SCOTT blockData is regularly undefined, this should not happen and is not handled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed
With some testing:
|
|
||
const poolId = await KyveApi.fetchPoolId(chainId, lcdClient); | ||
|
||
logger.info(`Kyve API connected`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be handy to have logging when Kyve isn't being used too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried the latest and kept getting this error:
2024-04-25T23:11:33.863Z <worker #0> ERROR Uncaught Exception Error: ENOENT: no such file or directory, stat 'bundle_2_100.json'
/Users/scotttwiname/Projects/subql/node_modules/@subql/node-core/dist/indexer/worker/worker.js:130
throw e;
^
[Error: ENOENT: no such file or directory, stat 'bundle_2_100.json'] {
errno: -2,
code: 'ENOENT',
syscall: 'stat',
path: 'bundle_2_100.json'
}
Thrown at:
Node.js v20.11.1
(node:74466) WARNING: Exited the environment with code 7
at processPromiseRejections (node:internal/process/promises:289:13)
at processTicksAndRejections (node:internal/process/task_queues:96:32)
const isStale = async (file: string) => | ||
((await fs.promises.stat(file)).mode & 0o777).toString(8) === '200'; | ||
|
||
files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a promise.all?
packages/node/src/utils/kyve/kyve.ts
Outdated
private async bundleIdIterator(height: number): Promise<number> { | ||
let bundle: BundleDetails; | ||
let bundleId: number; | ||
|
||
const cachedBundles = await this.getResolvedBundleDetails(); | ||
const nearestBundle = cachedBundles.filter( | ||
(b) => parseDecimal(b.to_key) < height, | ||
); | ||
|
||
bundleId = Math.max(...nearestBundle.map((b) => parseDecimal(b.id))) + 1; | ||
|
||
bundle = await this.getBundleById(bundleId); | ||
|
||
while (parseDecimal(bundle.to_key) < height) { | ||
bundleId++; | ||
bundle = await this.getBundleById(bundleId); | ||
} | ||
return parseDecimal(bundle.id); | ||
} | ||
|
||
private async getBundleId(height: number): Promise<number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These names can be more similar, also having a JSDOC description how they differ would be handy
let bundleId: number; | ||
|
||
const cachedBundles = await this.getResolvedBundleDetails(); | ||
const nearestBundle = cachedBundles.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be undefined, please add a test
packages/node/src/utils/kyve/kyve.ts
Outdated
await this.downloadAndProcessBundle(bundle); | ||
return await this.readFromFile(bundleFilePath); | ||
} catch (e: any) { | ||
if (['EEXIST', 'EACCES', 'ENOENT'].includes(e.code)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the ENOENT
case happen?
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
Description
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
Fixes # (issue)
Type of change
Please delete options that are not relevant.
Checklist